package com.test;

import com.alibaba.fastjson.JSON;
import com.test.business.FraudDetector;
import com.test.entry.Order;
import com.test.sink.FraudSink;
import com.test.source.RocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FaudCheckJob {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> streamSource = env
                .addSource(new RocketMQConsumer("TopicTest"))
                .name("rocketMQ source");

        DataStream<String> dataStream = streamSource.map(s -> JSON.parseObject(s, Order.class))
                .keyBy(Order::getAccountNo)
                .process(new FraudDetector())
                .name("防欺诈逻辑");


        dataStream
                .addSink(new FraudSink())
                .name("打印危险帐号");

        env.execute("防欺诈测试例子");

    }
}
